/**
 * <p>Title: HBaseTemplate.java</p>
 * <p>Description: </p>
 * <p>Copyright: Copyright (c) 2017</p>
 * <p>Company: www.zto.com</p>
 */
package com.zto.boot.hbase;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import com.zto.boot.hbase.annotation.AnnotationParser;
import com.zto.boot.hbase.exception.HBaseClientException;
import com.zto.boot.hbase.function.GetFunction;
import com.zto.boot.hbase.function.DefaultFunction;
import org.hbase.async.*;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;


/**
 * <p>Class: HBaseTemplate</p>
 * <p>Description: HBase所有操作的Template</p>
 *
 * @author xiaowenke@zto.cn
 * @version 1.0
 * @date 2018/5/19
 */
public class HBaseTemplate implements InitializingBean, DisposableBean {

    private HBaseClient client;

    private Config config;

    public HBaseTemplate(Config config) {
        this.config = config;
    }

    public <T> void put(List<T> list) throws Exception {
        put(list,null);
    }

    public <T,R> void put(List<T> list, DefaultFunction<R> func) throws Exception {
        Set<PutRequest> requests = new HashSet<>();
        list.forEach(t -> requests.addAll(AnnotationParser.parsePutRequest(t)));
        flush(requests,func);
    }

    /***
     * 插入一条数据
     * @param t 实体Class
     */
    public <T> void put(T t) throws Exception {
        put(t, null);
    }

    /***
     * 插入一条数据
     * @param t 实体Class
     */
    public <T, R> void put(T t, DefaultFunction<R> func) throws Exception {
        Set<PutRequest> requests = AnnotationParser.parsePutRequest(t);
        flush(requests,func);
    }

    private <R> void flush(Set<PutRequest> requests, DefaultFunction<R> func) throws Exception {
        requests.forEach(request -> client.put(request));

        if (func != null) {
            client.flush().addCallbacks(o -> func.apply(o,null),
                    (Callback<R, Exception>) e -> func.apply(null, e));
            return ;
        }
        client.flush().join();
    }

    /***
     * 根据rowkey删除
     * @param rowKeys rowKey
     * @param clazz 实体Class
     */
    public <T> void delete(Class<T> clazz,  String... rowKeys) {
        delete(clazz,null,rowKeys);
    }

    /***
     * 根据rowkey删除
     * @param rowKeys rowKey
     * @param clazz 实体Class
     */
    public <T, R> void delete(Class<T> clazz, DefaultFunction<R> func, String... rowKeys) {
        List<Deferred<Object>> deferredList = new ArrayList<>();
        for (String rowkey : rowKeys) {
            DeleteRequest request = AnnotationParser.parseDeleteRequest(clazz, rowkey);
            Deferred<Object> delete = client.delete(request);
            deferredList.add(delete);
        }
        if (func != null) {
            client.flush().addCallbacks(o -> func.apply(o,null),
                    (Callback<R, Exception>) e -> func.apply(null, e));
            return;
        }
        client.flush();
        for (Deferred<Object> deferred : deferredList) {
            try {
                deferred.join();
            } catch (Exception e) {
                throw new HBaseClientException("delete exception", e);
            }
        }
    }

    /***
     * 查询数据
     * @param clazz 实体Class
     * @param rowKey rowKey
     * @return
     */
    public <T> T get(Class<T> clazz, String rowKey) {
        return get(clazz, rowKey ,null);
    }

    /***
     * 查询数据
     * @param clazz 实体Class
     * @param rowKey rowKey
     * @return
     */
    public <T,R> T get(Class<T> clazz, String rowKey, GetFunction<T,R> func) {

        Deferred<ArrayList<KeyValue>> deferred =  client.get(AnnotationParser.parseGetRequest(clazz, rowKey));

        if (func != null) {
            deferred.addCallbacks(keyValues -> func.apply(AnnotationParser.parseGetResponse(clazz, keyValues), null),
                    (Callback<R, Exception>) e -> func.apply(null, e)
            );
            client.flush();
            return null;
        }

        try {
            ArrayList<KeyValue> keyValues = deferred.joinUninterruptibly();
            return AnnotationParser.parseGetResponse(clazz, keyValues);
        } catch (Exception e) {
            throw HBaseClientException.exception("查询Hbase异常rowKey=" + rowKey + "{0}", e);
        }
    }

    /**
     * 扫描hbase,查询全表数据
     * @param clazz 实体Class
     * @param tableName hbase存储的表名
     * @return 实体Class集合
     * @throws Exception
     */
    public <T> List<T> scanner(Class<T> clazz, String tableName ) throws Exception {
        Scanner scanner = client.newScanner(tableName);
        return scanner(clazz,scanner);
    }

    /**
     * 扫描hbase,指定scanner,指定条件查询
     * @param clazz 实体Class
     * @param scanner scanner
     * @return 实体Class集合
     * @throws Exception
     */
    public <T> List<T> scanner(Class<T> clazz, Scanner scanner) throws Exception {
        ArrayList<ArrayList<KeyValue>> rows;
        List<T> list = new ArrayList<>();
        while ((rows = scanner.nextRows().joinUninterruptibly()) != null) {

            for (ArrayList<KeyValue> row : rows) {
                list.add(AnnotationParser.parseGetResponse(clazz, row));
            }
        }
        return list;
    }


    /**
     * 返回hbase客户端,用于满足个性化的操作需求
     *
     * @return
     */
    public HBaseClient getClient() {
        return client;
    }

    /***
     * HBase 回调方法
     * @param <T>
     */
    interface CallBack<T> {
        public T doInTable(HBaseClient client);
    }

    /**
     * 所有方法无法满足需求时可使用原生客户端
     * @param callBack callBack实现
     * @return
     */
    public HBaseClient execute(CallBack<HBaseClient> callBack) {
        final HBaseClient client = this.client;
        return callBack.doInTable(client);
    }

    @Override
    public void destroy() throws Exception {
        if (this.client != null) {
            try {
                this.client.shutdown().join();
            } catch (Exception e) {
            }
        }
    }
//
    @Override
    public void afterPropertiesSet() throws Exception {
        this.client = new HBaseClient(this.config);
    }
}
